package kafka;

import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;

import java.time.Duration;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;

public class KafkaClient {

    public static final String TOPIC = "test-jian";
    public static final String KAFKA_SERVER = "10.5.200.89:9092";
    public static final String KAFKA_ZK = "10.5.200.89:2181";

    public static void main(String[] args) {
        new Thread(() -> produceData()).start();
//        new Thread(() -> consumeData()).start();
        try {
            Thread.currentThread().join();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
//        produceData();
//        consumeData();
    }

    public static void produceData() {
        Map<String, Object> properties = new HashMap<>();
        properties.put("bootstrap.servers", KAFKA_SERVER);
        properties.put("key.serializer", StringSerializer.class.getName());
        properties.put("value.serializer", StringSerializer.class.getName());
        properties.put("key.deserializer", StringDeserializer.class.getName());
        properties.put("value.deserializer", StringDeserializer.class.getName());
        // 此处无效，取决于服务端server.properties
        properties.put("auto.create.topics.enable", "true");
        properties.put("zk.connect", KAFKA_ZK);
        Producer<String, String> producer = new KafkaProducer<String, String>(properties);
        int i = 0;
        for (; ; ) {
            i++;
            try {
                producer.send(new ProducerRecord<>(TOPIC, "hello-key" + i, "world-value"));
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
        //producer.close();
    }

    public static void consumeData() {
        Properties properties = new Properties();
        properties.put("bootstrap.servers", KAFKA_SERVER);
        properties.put("group.id", "consume-jian1");
        properties.put("enable.auto.commit", "true");
        properties.put("auto.commit.interval.ms", "1000");
        properties.put("key.deserializer", StringDeserializer.class.getName());
        properties.put("value.deserializer", StringDeserializer.class.getName());
        Consumer<String, String> consumer = new KafkaConsumer<>(properties);
        consumer.subscribe(Arrays.asList(TOPIC));
        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
            for (ConsumerRecord<String, String> record : records) {
                System.out.printf("partition:%d,offset=%d,key=%s,value=%s%n", record.partition(), record.offset(), record.key(), record.value());
            }
        }
    }
}
